package cn.edu360.beans

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}


object dmp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("互联网广告")
      // 设置序列化方式， [rdd] [worker]
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      // 占用空间比较小
      .set("spark.sql.parquet.compression.codec", "snappy")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val file: RDD[String] = sc.textFile("D:/2016-10-01_06_p1_invalid.1475274123982.log.FINISH")
    val value: RDD[Row] = file.map(t => t.split(",",-1).map(t => {
      if (t.equals("")) "0" else t
    })).filter(t => t.length >= 85).map(t => Row(
        t(0),
        t(1).toInt,
        t(2).toInt,
        t(3).toInt,
        t(4).toInt,
        t(5),
        t(6),
        t(7).toInt,
        t(8).toInt,
        t(9).toDouble,
        t(10).toDouble,
        t(11),
        t(12),
        t(13),
        t(14),
        t(15),
        t(16),
        t(17).toInt,
        t(18),
        t(19),
        t(20).toInt,
        t(21).toInt,
        t(22),
        t(23),
        t(24),
        t(25),
        t(26).toInt,
        t(27),
        t(28).toInt,
        t(29),
        t(30).toInt,
        t(31).toInt,
        t(32).toInt,
        t(33),
        t(34).toInt,
        t(35).toInt,
        t(36).toInt,
        t(37),
        t(38).toInt,
        t(39).toInt,
        t(40).toDouble,
        t(41).toDouble,
        t(42).toInt,
        t(43),
        t(44).toDouble,
        t(45).toDouble,
        t(46),
        t(47),
        t(48),
        t(49),
        t(50),
        t(51),
        t(52),
        t(53),
        t(54),
        t(55),
        t(56),
        t(57).toInt,
        t(58).toDouble,
        t(59).toInt,
        t(60).toInt,
        t(61),
        t(62),
        t(63),
        t(64),
        t(65),
        t(66),
        t(67),
        t(68),
        t(69),
        t(70),
        t(71),
        t(72),
        t(73).toInt,
        t(74).toDouble,
        t(75).toDouble,
        t(76).toDouble,
        t(77).toDouble,
        t(78).toDouble,
        t(79),
        t(80),
        t(81),
        t(82),
        t(83),
        t(84).toInt
      ))

    val  schema=StructType(
      Array(
        // 字段名称  类型  是否为空
        StructField("sessionid",StringType),
        StructField("advertisersid",IntegerType),
        StructField("adorderid",IntegerType),
        StructField("adcreativeid",IntegerType),
        StructField("adplatformproviderid",IntegerType),
        StructField("sdkversion",StringType),
        StructField("adplatformkey",StringType),
        StructField("putinmodeltype",IntegerType),
        StructField("requestmode",IntegerType),
        StructField("adprice",DoubleType),
        StructField("adppprice",DoubleType),
        StructField("requestdate",StringType),
        StructField("ip",StringType),
        StructField("appid",StringType),
        StructField("appname",StringType),
        StructField("uuid",StringType),
        StructField("device",StringType),
        StructField("client",IntegerType),
        StructField("osversion",StringType),
        StructField("density",StringType),
        StructField("pw",IntegerType),
        StructField("ph",IntegerType),
        StructField("long",StringType),
        StructField("lat",StringType),
        StructField("provincename",StringType),
        StructField("cityname",StringType),
        StructField("ispid",IntegerType),
        StructField("ispname",StringType),
        StructField("networkmannerid",IntegerType),
        StructField("networkmannername",StringType),
        StructField("iseffective",IntegerType),
        StructField("isbilling",IntegerType),
        StructField("adspacetype",IntegerType),
        StructField("adspacetypename",StringType),
        StructField("devicetype",IntegerType),
        StructField("processnode",IntegerType),
        StructField("apptype",IntegerType),
        StructField("district",StringType),
        StructField("paymode",IntegerType),
        StructField("isbid",IntegerType),
        StructField("bidprice",DoubleType),
        StructField("winprice",DoubleType),
        StructField("iswin",IntegerType),
        StructField("cur",StringType),
        StructField("rate",DoubleType),
        StructField("cnywinprice",DoubleType),
        StructField("imei",StringType),
        StructField("mac",StringType),
        StructField("idfa",StringType),
        StructField("openudid",StringType),
        StructField("androidid",StringType),
        StructField("rtbprovince",StringType),
        StructField("rtbcity",StringType),
        StructField("rtbdistrict",StringType),
        StructField("rtbstreet",StringType),
        StructField("storeurl",StringType),
        StructField("realip",StringType),
        StructField("isqualityapp",IntegerType),
        StructField("bidfloor",DoubleType),
        StructField("aw",IntegerType),
        StructField("ah",IntegerType),
        StructField("imeimd5",StringType),
        StructField("macmd5",StringType),
        StructField("idfamd5",StringType),
        StructField("openudidmd5",StringType),
        StructField("androididmd5",StringType),
        StructField("imeisha1",StringType),
        StructField("macsha1",StringType),
        StructField("idfasha1",StringType),
        StructField("openudidsha1",StringType),
        StructField("androididsha1",StringType),
        StructField("uuidunknow",StringType),
        StructField("userid",StringType),
        StructField("iptype",IntegerType),
        StructField("initbidprice",DoubleType),
        StructField("adpayment",DoubleType),
        StructField("agentrate",DoubleType),
        StructField("lomarkrate",DoubleType),
        StructField("adxrate",DoubleType),
        StructField("title",StringType),
        StructField("keywords",StringType),
        StructField("tagid",StringType),
        StructField("callbackdate",StringType),
        StructField("channelid",StringType),
        StructField("mediatype",IntegerType)
      ))
    val pdf: DataFrame = sqlContext.createDataFrame(value,schema)
    pdf.coalesce(1).write.mode(SaveMode.Overwrite).parquet("d:/ParquetFile")
    //sqlContext.read.text()
    //    val itemDF: DataFrame = file.toDF()
    //    itemDF.write.parquet("D:/adFile/")
    sc.stop()

  }

}
